Ask AI

You are viewing an unreleased or outdated version of the documentation

Airlift (dagster-airlift)

Core (dagster_airlift.core)

AirflowInstance

class dagster_airlift.core.AirflowInstance(auth_backend, name, batch_task_instance_limit=100, batch_dag_runs_limit=100)[source]

A class that represents a running Airflow Instance and provides methods for interacting with its REST API.

Parameters:
  • auth_backend (AirflowAuthBackend) – The authentication backend to use when making requests to the Airflow instance.

  • name (str) – The name of the Airflow instance. This will be prefixed to any assets automatically created using this instance.

  • batch_task_instance_limit (int) – The number of task instances to query at a time when fetching task instances. Defaults to 100.

  • batch_dag_runs_limit (int) – The number of dag runs to query at a time when fetching dag runs. Defaults to 100.

get_run_state(dag_id, run_id)[source]

Given a run ID of an airflow dag, return the state of that run.

Parameters:
  • dag_id (str) – The dag id.

  • run_id (str) – The run id.

Returns:

The state of the run. Will be one of the states defined by Airflow.

Return type:

str

trigger_dag(dag_id, logical_date=None)[source]

Trigger a dag run for the given dag_id.

Does not wait for the run to finish. To wait for the completed run to finish, use wait_for_run_completion().

Parameters:
Returns:

The dag run id.

Return type:

str

wait_for_run_completion(dag_id, run_id, timeout=30)[source]

Given a run ID of an airflow dag, wait for that run to reach a completed state.

Parameters:
  • dag_id (str) – The dag id.

  • run_id (str) – The run id.

  • timeout (int) – The number of seconds to wait before timing out.

Returns:

None

class dagster_airlift.core.AirflowAuthBackend[source]

An abstract class that represents an authentication backend for an Airflow instance.

Requires two methods to be implemented by subclasses: - get_session: Returns a requests.Session object that can be used to make requests to the Airflow instance, and handles authentication. - get_webserver_url: Returns the base URL of the Airflow webserver.

The dagster-airlift package provides the following default implementations: - dagster-airlift.core.AirflowBasicAuthBackend: An authentication backend that uses Airflow’s basic auth to authenticate with the Airflow instance. - dagster-airlift.mwaa.MwaaSessionAuthBackend: An authentication backend that uses AWS MWAA’s web login token to authenticate with the Airflow instance (requires dagster-airlift[mwaa]).

class dagster_airlift.core.AirflowBasicAuthBackend(webserver_url, username, password)[source]

A dagster_airlift.core.AirflowAuthBackend that authenticates using basic auth.

Parameters:
  • webserver_url (str) – The URL of the webserver.

  • username (str) – The username to authenticate with.

  • password (str) – The password to authenticate with.

Examples

Creating a AirflowInstance using this backend.

from dagster_airlift.core import AirflowInstance, AirflowBasicAuthBackend

af_instance = AirflowInstance(
    name="my-instance",
    auth_backend=AirflowBasicAuthBackend(
        webserver_url="https://my-webserver-hostname",
        username="my-username",
        password="my-password"
    )
)

Assets & Definitions

dagster_airlift.core.build_defs_from_airflow_instance(*, airflow_instance, defs=None, sensor_minimum_interval_seconds=1, event_transformer_fn=<function default_event_transformer>, dag_selector_fn=None)[source]

Builds a dagster.Definitions object from an Airflow instance.

For every DAG in the Airflow instance, this function will create a Dagster asset for the DAG with an asset key instance_name/dag/dag_id. It will also create a sensor that polls the Airflow instance for DAG runs and emits Dagster events for each successful run.

An optional defs argument can be provided, where the user can pass in a dagster.Definitions object containing assets which are mapped to Airflow DAGs and tasks. These assets will be enriched with metadata from the Airflow instance, and placed upstream of the automatically generated DAG assets.

An optional event_transformer_fn can be provided, which allows the user to modify the Dagster events produced by the sensor. The function takes the Dagster events produced by the sensor and returns a sequence of Dagster events.

An optional dag_selector_fn can be provided, which allows the user to filter which DAGs assets are created for. The function takes a dagster_airlift.core.serialization.serialized_data.DagInfo object and returns a boolean indicating whether the DAG should be included.

Parameters:
  • airflow_instance (AirflowInstance) – The Airflow instance to build assets and the sensor from.

  • defs – Optional[Definitions]: A dagster.Definitions object containing assets that are mapped to Airflow DAGs and tasks.

  • sensor_minimum_interval_seconds (int) – The minimum interval in seconds between sensor runs.

  • event_transformer_fn (DagsterEventTransformerFn) – A function that allows for modifying the Dagster events produced by the sensor.

  • dag_selector_fn (Optional[DagSelectorFn]) – A function that allows for filtering which DAGs assets are created for.

Returns:

A dagster.Definitions object containing the assets and sensor.

Return type:

Definitions

Examples

Building a dagster.Definitions object from an Airflow instance.

from dagster_airlift.core import (
    AirflowInstance,
    AirflowBasicAuthBackend,
    build_defs_from_airflow_instance,
)

from .constants import AIRFLOW_BASE_URL, AIRFLOW_INSTANCE_NAME, PASSWORD, USERNAME

airflow_instance = AirflowInstance(
    auth_backend=AirflowBasicAuthBackend(
        webserver_url=AIRFLOW_BASE_URL, username=USERNAME, password=PASSWORD
    ),
    name=AIRFLOW_INSTANCE_NAME,
)


defs = build_defs_from_airflow_instance(airflow_instance=airflow_instance)

Providing task-mapped assets to the function.

from dagster import Definitions
from dagster_airlift.core import (
    AirflowInstance,
    AirflowBasicAuthBackend,
    assets_with_task_mappings,
    build_defs_from_airflow_instance,
)
...


defs = build_defs_from_airflow_instance(
    airflow_instance=airflow_instance, # same as above
    defs=Definitions(
        assets=assets_with_task_mappings(
            dag_id="rebuild_iris_models",
            task_mappings={
                "my_task": [AssetSpec("my_first_asset"), AssetSpec("my_second_asset")],
            },
        ),
    ),
)

Providing a custom event transformer function.

from typing import Sequence
from dagster import Definitions, SensorEvaluationContext
from dagster_airlift.core import (
    AirflowInstance,
    AirflowBasicAuthBackend,
    AssetEvent,
    assets_with_task_mappings,
    build_defs_from_airflow_instance,
    AirflowDefinitionsData,
)
...

def add_tags_to_events(
    context: SensorEvaluationContext,
    defs_data: AirflowDefinitionsData,
    events: Sequence[AssetEvent]
) -> Sequence[AssetEvent]:
    altered_events = []
    for event in events:
        altered_events.append(event._replace(tags={"my_tag": "my_value"}))
    return altered_events

defs = build_defs_from_airflow_instance(
    airflow_instance=airflow_instance, # same as above
    event_transformer_fn=add_tags_to_events,
)

Filtering which DAGs assets are created for.

from dagster import Definitions
from dagster_airlift.core import (
    AirflowInstance,
    AirflowBasicAuthBackend,
    AssetEvent,
    assets_with_task_mappings,
    build_defs_from_airflow_instance,
    DagInfo,
)
...

def only_include_dag(dag_info: DagInfo) -> bool:
    return dag_info.dag_id == "my_dag_id"

defs = build_defs_from_airflow_instance(
    airflow_instance=airflow_instance, # same as above
    dag_selector_fn=only_include_dag,
)

Mapping Dagster assets to Airflow tasks/dags:

dagster_airlift.core.assets_with_task_mappings(dag_id, task_mappings)[source]

Modify assets to be associated with a particular task in Airlift tooling.

Used in concert with build_defs_from_airflow_instance to observe an airflow instance to monitor the tasks that are associated with the assets and keep their materialization histories up to date.

Concretely this adds metadata to all asset specs in the provided definitions with the provided dag_id and task_id. The dag_id comes from the dag_id argument; the task_id comes from the key of the provided task_mappings dictionary. There is a single metadata key “airlift/task-mapping” that is used to store this information. It is a list of dictionaries with keys “dag_id” and “task_id”.

Example

from dagster import AssetSpec, Definitions, asset
from dagster_airlift.core import assets_with_task_mappings

@asset
def asset_one() -> None: ...

defs = Definitions(
    assets=assets_with_task_mappings(
        dag_id="dag_one",
        task_mappings={
            "task_one": [asset_one],
            "task_two": [AssetSpec(key="asset_two"), AssetSpec(key="asset_three")],
        },
    )
)
dagster_airlift.core.assets_with_dag_mappings(dag_mappings)[source]

Modify assets to be associated with a particular dag in Airlift tooling.

Used in concert with build_defs_from_airflow_instance to observe an airflow instance to monitor the dags that are associated with the assets and keep their materialization histories up to date.

In contrast with assets_with_task_mappings, which maps assets on a per-task basis, this is used in concert with proxying_to_dagster dag-level mappings where an entire dag is migrated at once.

Concretely this adds metadata to all asset specs in the provided definitions with the provided dag_id. The dag_id comes from the key of the provided dag_mappings dictionary. There is a single metadata key “airlift/dag-mapping” that is used to store this information. It is a list of strings, where each string is a dag_id which the asset is associated with.

Example:

from dagster import AssetSpec, Definitions, asset
from dagster_airlift.core import assets_with_dag_mappings

@asset
def asset_one() -> None: ...

defs = Definitions(
    assets=assets_with_dag_mappings(
        dag_mappings={
            "dag_one": [asset_one],
            "dag_two": [AssetSpec(key="asset_two"), AssetSpec(key="asset_three")],
        },
    )
)
dagster_airlift.core.assets_with_multiple_task_mappings(assets, task_handles)[source]

Given an asset or assets definition, return a new asset or assets definition with metadata that indicates that it is targeted by multiple airflow tasks. An example of this would be a separate weekly and daily dag that contains a task that targets a single asset.

from dagster import Definitions, AssetSpec, asset
from dagster_airlift import (
    build_defs_from_airflow_instance,
    targeted_by_multiple_tasks,
    assets_with_task_mappings,
)

# Asset maps to a single task.
@asset
def other_asset(): ...

# Asset maps to a physical entity which is produced by two different airflow tasks.
@asset
def scheduled_twice(): ...

defs = build_defs_from_airflow_instance(
    airflow_instance=airflow_instance,
    defs=Definitions(
        assets=[
            *assets_with_task_mappings(
                dag_id="other_dag",
                task_mappings={
                    "task1": [other_asset]
                },
            ),
            *assets_with_multiple_task_mappings(
                assets=[scheduled_twice],
                task_handles=[
                    {"dag_id": "weekly_dag", "task_id": "task1"},
                    {"dag_id": "daily_dag", "task_id": "task1"},
                ],
            ),
        ]
    ),
)

Annotations for customizable components:

dagster_airlift.core.DagSelectorFn

alias of Callable[[DagInfo], bool]

dagster_airlift.core.DagsterEventTransformerFn

alias of Callable[[SensorEvaluationContext, AirflowDefinitionsData, Sequence[AssetMaterialization]], Iterable[AssetMaterialization | AssetObservation | AssetCheckEvaluation]]

class dagster_airlift.core.TaskHandleDict[source]

Objects for retrieving information about the Airflow/Dagster mapping:

class dagster_airlift.core.DagInfo(*args, **kwargs)[source]

A record containing information about a given airflow dag.

Users should not instantiate this class directly. It is provided when customizing which DAGs are included in the generated definitions using the dag_selector_fn argument of build_defs_from_airflow_instance().

metadata

The metadata associated with the dag, retrieved by the Airflow REST API: https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/get_dags

Type:

Dict[str, Any]

class dagster_airlift.core.AirflowDefinitionsData(*args, **kwargs)[source]

A class that holds data about the assets that are mapped to Airflow dags and tasks, and provides methods for retrieving information about the mappings. The user should not instantiate this class directly. It is provided when customizing the events that are generated by the Airflow sensor using the event_transformer_fn argument of build_defs_from_airflow_instance().

asset_keys_in_task(dag_id, task_id)[source]

Returns the asset keys that are mapped to the given task.

Parameters:
  • dag_id (str) – The dag id.

  • task_id (str) – The task id.

task_ids_in_dag(dag_id)[source]

Returns the task ids within the given dag_id.

Parameters:

dag_id (str) – The dag id.

property instance_name

The name of the Airflow instance.

MWAA (dagster_airlift.mwaa)

class dagster_airlift.mwaa.MwaaSessionAuthBackend(mwaa_session, env_name)[source]

A dagster_airlift.core.AirflowAuthBackend that authenticates to AWS MWAA.

Under the hood, this class uses the MWAA boto3 session to request a web login token and then uses the token to authenticate to the MWAA web server.

Parameters:
  • mwaa_session (boto3.Session) – The boto3 MWAA session

  • env_name (str) – The name of the MWAA environment

Examples

Creating an AirflowInstance pointed at a MWAA environment.

import boto3
from dagster_airlift.mwaa import MwaaSessionAuthBackend
from dagster_airlift.core import AirflowInstance

boto_session = boto3.Session(profile_name="my_profile", region_name="us-west-2")
af_instance = AirflowInstance(
    name="my-mwaa-instance",
    auth_backend=MwaaSessionAuthBackend(
        mwaa_session=boto_session,
        env_name="my-mwaa-env"
    )
)

In Airflow (dagster_airlift.in_airflow)

Proxying

dagster_airlift.in_airflow.proxying_to_dagster(*, global_vars, proxied_state, logger=None, build_from_task_fn=<bound method BaseProxyTaskToDagsterOperator.build_from_task of <class 'dagster_airlift.in_airflow.task_proxy_operator.DefaultProxyTaskToDagsterOperator'>>, build_from_dag_fn=<bound method DefaultProxyDAGToDagsterOperator.build_from_dag of <class 'dagster_airlift.in_airflow.dag_proxy_operator.DefaultProxyDAGToDagsterOperator'>>)[source]

Proxies tasks and dags to Dagster based on provided proxied state. Expects a dictionary of in-scope global variables to be provided (typically retrieved with globals()), and a proxied state dictionary (typically retrieved with load_proxied_state_from_yaml()) for dags in that global state. This function will modify in-place the dictionary of global variables to replace proxied tasks with appropriate Dagster operators.

In the case of task-level proxying, the proxied tasks will be replaced with new operators that are constructed by the provided build_from_task_fn. A default implementation of this function is provided in DefaultProxyTaskToDagsterOperator. In the case of dag-level proxying, the entire dag structure will be replaced with a single task that is constructed by the provided build_from_dag_fn. A default implementation of this function is provided in DefaultProxyDAGToDagsterOperator.

Parameters:
  • global_vars (Dict[str, Any]) – The global variables in the current context. In most cases, retrieved with globals() (no import required). This is equivalent to what airflow already does to introspect the dags which exist in a given module context: https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html#loading-dags

  • proxied_state (AirflowMigrationState) – The proxied state for the dags.

  • logger (Optional[logging.Logger]) – The logger to use. Defaults to logging.getLogger(“dagster_airlift”).

Examples

Typical usage of this function is to be called at the end of a dag file, retrieving proxied_state from an accompanying proxied_state path.

from pathlib import Path

from airflow import DAG
from airflow.operators.python import PythonOperator
from dagster._time import get_current_datetime_midnight
from dagster_airlift.in_airflow import proxying_to_dagster
from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml


with DAG(
    dag_id="daily_interval_dag",
    ...,
) as minute_dag:
    PythonOperator(task_id="my_task", python_callable=...)

# At the end of the dag file, so we can ensure dags are loaded into globals.
proxying_to_dagster(
    proxied_state=load_proxied_state_from_yaml(Path(__file__).parent / "proxied_state"),
    global_vars=globals(),
)

You can also provide custom implementations of the build_from_task_fn function to customize the behavior of task-level proxying.

from dagster_airlift.in_airflow import proxying_to_dagster, BaseProxyTaskToDagsterOperator
from airflow.models.operator import BaseOperator

... # Dag code here

class CustomAuthTaskProxyOperator(BaseProxyTaskToDagsterOperator):
    def get_dagster_session(self, context: Context) -> requests.Session:
        # Add custom headers to the session
        return requests.Session(headers={"Authorization": "Bearer my_token"})

    def get_dagster_url(self, context: Context) -> str:
        # Use a custom environment variable for the dagster url
        return os.environ["CUSTOM_DAGSTER_URL"]

    @classmethod
    def build_from_task(cls, task: BaseOperator) -> "CustomAuthTaskProxyOperator":
        # Custom logic to build the operator from the task (task_id should remain the same)
        if task.task_id == "my_task_needs_more_retries":
            return CustomAuthTaskProxyOperator(task_id=task_id, retries=3)
        else:
            return CustomAuthTaskProxyOperator(task_id=task_id)

proxying_to_dagster(
    proxied_state=load_proxied_state_from_yaml(Path(__file__).parent / "proxied_state"),
    global_vars=globals(),
    build_from_task_fn=CustomAuthTaskProxyOperator.build_from_task,
)

You can do the same for dag-level proxying by providing a custom implementation of the build_from_dag_fn function.

from dagster_airlift.in_airflow import proxying_to_dagster, BaseProxyDAGToDagsterOperator
from airflow.models.dag import DAG

... # Dag code here

class CustomAuthDAGProxyOperator(BaseProxyDAGToDagsterOperator):
    def get_dagster_session(self, context: Context) -> requests.Session:
        # Add custom headers to the session
        return requests.Session(headers={"Authorization": "Bearer my_token"})

    def get_dagster_url(self, context: Context) -> str:
        # Use a custom environment variable for the dagster url
        return os.environ["CUSTOM_DAGSTER_URL"]

    @classmethod
    def build_from_dag(cls, dag: DAG) -> "CustomAuthDAGProxyOperator":
        # Custom logic to build the operator from the dag (DAG id should remain the same)
        if dag.dag_id == "my_dag_needs_more_retries":
            return CustomAuthDAGProxyOperator(task_id="custom override", retries=3, dag=dag)
        else:
            return CustomAuthDAGProxyOperator(task_id="basic_override", dag=dag)

proxying_to_dagster(
    proxied_state=load_proxied_state_from_yaml(Path(__file__).parent / "proxied_state"),
    global_vars=globals(),
    build_from_dag_fn=CustomAuthDAGProxyOperator.build_from_dag,
)
class dagster_airlift.in_airflow.BaseDagsterAssetsOperator(*args, **kwargs)[source]

Interface for an operator which materializes dagster assets.

This operator needs to implement the following methods:

  • get_dagster_session: Returns a requests session that can be used to make requests to the Dagster API.

    This is where any additional authentication can be added.

  • get_dagster_url: Returns the URL for the Dagster instance.

  • filter_asset_nodes: Filters asset nodes (which are returned from Dagster’s graphql API) to only include those

    that should be triggered by the current task.

Optionally, these methods can be overridden as well:

  • get_partition_key: Determines the partition key to use to trigger the dagster run. This method will only be

    called if the underlying asset is partitioned.

dagster_airlift.in_airflow.load_proxied_state_from_yaml(proxied_yaml_path)[source]

Loads the proxied state from a directory of yaml files.

Expects the directory to contain yaml files, where each file corresponds to the id of a dag (ie: dag_id.yaml). This directory is typically constructed using the dagster-airlift CLI:

AIRFLOW_HOME=... dagster-airlift proxy scaffold

The file should have either of the following structure. In the case of task-level proxying:

tasks:
    - id: task_id
      proxied: true
    - id: task_id
      proxied: false

In the case of dag-level proxying:

proxied: true
Parameters:

proxied_yaml_path (Path) – The path to the directory containing the yaml files.

Returns:

The proxied state of the dags and tasks in Airflow.

Return type:

AirflowProxiedState

Proxying State

class dagster_airlift.in_airflow.AirflowProxiedState(dags)[source]

A class to store the proxied state of dags and tasks in Airflow. Typically, this is constructed by load_proxied_state_from_yaml().

Parameters:

dags (Dict[str, DagProxiedState]) – A dictionary of dag_id to DagProxiedState.

class dagster_airlift.in_airflow.DagProxiedState(proxied, tasks)[source]

A class to store the proxied state of tasks in a dag.

Parameters:
  • tasks (Dict[str, TaskProxiedState]) – A dictionary of task_id to TaskProxiedState. If the entire dag is proxied, or proxied state is not set for a task, the task_id will not be present in this dictionary.

  • proxied (Optional[bool]) – A boolean indicating whether the entire dag is proxied. If this is None, then the dag proxies at the task level (or

  • all). (proxying state has not been set at)

class dagster_airlift.in_airflow.TaskProxiedState(task_id, proxied)[source]

A class to store the proxied state of a task.

Parameters:
  • task_id (str) – The id of the task.

  • proxied (bool) – A boolean indicating whether the task is proxied.

Task-level Proxying

class dagster_airlift.in_airflow.BaseProxyTaskToDagsterOperator(*args, **kwargs)[source]

An operator that proxies task execution to Dagster assets with metadata that map to this task’s dag ID and task ID.

For the DAG ID and task ID that this operator proxies, it expects there to be corresponding assets in the linked Dagster deployment that have metadata entries with the key dagster-airlift/task-mapping that map to this DAG ID and task ID. This metadata is typically set using the dagster_airlift.core.assets_with_task_mappings() function.

The following methods must be implemented by subclasses:

There is a default implementation of this operator, DefaultProxyTaskToDagsterOperator, which is used by proxying_to_dagster() if no override operator is provided.

class dagster_airlift.in_airflow.DefaultProxyTaskToDagsterOperator(*args, **kwargs)[source]

The default task proxying operator - which opens a blank session and expects the dagster URL to be set in the environment. The dagster url is expected to be set in the environment as DAGSTER_URL.

This operator should not be instantiated directly - it is instantiated by proxying_to_dagster() if no override operator is provided.

Dag-level Proxying

class dagster_airlift.in_airflow.BaseProxyDAGToDagsterOperator(*args, **kwargs)[source]

An operator base class that proxies the entire DAG’s execution to Dagster assets with metadata that map to the DAG id used by this task.

For the Dag ID that this operator proxies, it expects there to be corresponding assets in the linked Dagster deployment that have metadata entries with the key dagster-airlift/dag-mapping that map to this Dag ID. This metadata is typically set using the dagster_airlift.core.assets_with_dag_mappings() function.

The following methods must be implemented by subclasses:

There is a default implementation of this operator, DefaultProxyDAGToDagsterOperator, which is used by proxying_to_dagster() if no override operator is provided.

class dagster_airlift.in_airflow.DefaultProxyDAGToDagsterOperator(*args, **kwargs)[source]

The default task proxying operator - which opens a blank session and expects the dagster URL to be set in the environment. The dagster url is expected to be set in the environment as DAGSTER_URL.

This operator should not be instantiated directly - it is instantiated by proxying_to_dagster() if no override operator is provided.